import { ClientDuplexStream } from '@grpc/grpc-js';
import { TextDecoder, TextEncoder } from 'util';
import { injectable, inject, named } from 'inversify';
import { Struct } from 'google-protobuf/google/protobuf/struct_pb';
import { Emitter } from '@theia/core/lib/common/event';
import { ILogger } from '@theia/core/lib/common/logger';
import {
    MonitorService,
    MonitorServiceClient,
    MonitorConfig,
    MonitorError,
    Status,
} from '../../common/protocol/monitor-service';
import {
    StreamingOpenRequest,
    StreamingOpenResponse,
    MonitorConfig as GrpcMonitorConfig,
} from '../cli-protocol/cc/arduino/cli/monitor/v1/monitor_pb';
import { MonitorClientProvider } from './monitor-client-provider';
import { Board, Port } from '../../common/protocol/boards-service';

interface ErrorWithCode extends Error {
    readonly code: number;
}
namespace ErrorWithCode {
    export function toMonitorError(
        error: Error,
        config: MonitorConfig
    ): MonitorError {
        const { message } = error;
        let code = undefined;
        if (is(error)) {
            // TODO: const `mapping`. Use regex for the `message`.
            const mapping = new Map<string, number>();
            mapping.set(
                '1 CANCELLED: Cancelled on client',
                MonitorError.ErrorCodes.CLIENT_CANCEL
            );
            mapping.set(
                '2 UNKNOWN: device not configured',
                MonitorError.ErrorCodes.DEVICE_NOT_CONFIGURED
            );
            mapping.set(
                '2 UNKNOWN: error opening serial monitor: Serial port busy',
                MonitorError.ErrorCodes.DEVICE_BUSY
            );
            code = mapping.get(message);
        }
        return {
            message,
            code,
            config,
        };
    }
    function is(error: Error & { code?: number }): error is ErrorWithCode {
        return typeof error.code === 'number';
    }
}

@injectable()
export class MonitorServiceImpl implements MonitorService {
    @inject(ILogger)
    @named('monitor-service')
    protected readonly logger: ILogger;

    @inject(MonitorClientProvider)
    protected readonly monitorClientProvider: MonitorClientProvider;

    protected client?: MonitorServiceClient;
    protected connection?: {
        duplex: ClientDuplexStream<StreamingOpenRequest, StreamingOpenResponse>;
        config: MonitorConfig;
    };
    protected messages: string[] = [];
    protected onMessageDidReadEmitter = new Emitter<void>();

    setClient(client: MonitorServiceClient | undefined): void {
        this.client = client;
    }

    dispose(): void {
        this.logger.info('>>> Disposing monitor service...');
        if (this.connection) {
            this.disconnect();
        }
        this.logger.info('<<< Disposed monitor service.');
        this.client = undefined;
    }

    async connect(config: MonitorConfig): Promise<Status> {
        this.logger.info(
            `>>> Creating serial monitor connection for ${Board.toString(
                config.board
            )} on port ${Port.toString(config.port)}...`
        );
        if (this.connection) {
            return Status.ALREADY_CONNECTED;
        }
        const client = await this.monitorClientProvider.client();
        if (!client) {
            return Status.NOT_CONNECTED;
        }
        if (client instanceof Error) {
            return { message: client.message };
        }
        const duplex = client.streamingOpen();
        this.connection = { duplex, config };

        duplex.on(
            'error',
            ((error: Error) => {
                const monitorError = ErrorWithCode.toMonitorError(
                    error,
                    config
                );
                this.disconnect(monitorError).then(() => {
                    if (this.client) {
                        this.client.notifyError(monitorError);
                    }
                    if (monitorError.code === undefined) {
                        // Log the original, unexpected error.
                        this.logger.error(error);
                    }
                });
            }).bind(this)
        );

        duplex.on(
            'data',
            ((resp: StreamingOpenResponse) => {
                const raw = resp.getData();
                const message =
                    typeof raw === 'string'
                        ? raw
                        : new TextDecoder('utf8').decode(raw);
                this.messages.push(message);
                this.onMessageDidReadEmitter.fire();
            }).bind(this)
        );

        const { type, port } = config;
        const req = new StreamingOpenRequest();
        const monitorConfig = new GrpcMonitorConfig();
        monitorConfig.setType(this.mapType(type));
        monitorConfig.setTarget(port.address);
        if (config.baudRate !== undefined) {
            monitorConfig.setAdditionalConfig(
                Struct.fromJavaScript({ BaudRate: config.baudRate })
            );
        }
        req.setConfig(monitorConfig);

        return new Promise<Status>((resolve) => {
            if (this.connection) {
                this.connection.duplex.write(req, () => {
                    this.logger.info(
                        `<<< Serial monitor connection created for ${Board.toString(
                            config.board,
                            { useFqbn: false }
                        )} on port ${Port.toString(config.port)}.`
                    );
                    resolve(Status.OK);
                });
                return;
            }
            this.disconnect().then(() => resolve(Status.NOT_CONNECTED));
        });
    }

    async disconnect(reason?: MonitorError): Promise<Status> {
        try {
            if (
                !this.connection &&
                reason &&
                reason.code === MonitorError.ErrorCodes.CLIENT_CANCEL
            ) {
                return Status.OK;
            }
            this.logger.info('>>> Disposing monitor connection...');
            if (!this.connection) {
                this.logger.warn('<<< Not connected. Nothing to dispose.');
                return Status.NOT_CONNECTED;
            }
            const { duplex, config } = this.connection;
            duplex.cancel();
            this.logger.info(
                `<<< Disposed monitor connection for ${Board.toString(
                    config.board,
                    { useFqbn: false }
                )} on port ${Port.toString(config.port)}.`
            );
            this.connection = undefined;
            return Status.OK;
        } finally {
            this.messages.length = 0;
        }
    }

    async send(message: string): Promise<Status> {
        if (!this.connection) {
            return Status.NOT_CONNECTED;
        }
        const req = new StreamingOpenRequest();
        req.setData(new TextEncoder().encode(message));
        return new Promise<Status>((resolve) => {
            if (this.connection) {
                this.connection.duplex.write(req, () => {
                    resolve(Status.OK);
                });
                return;
            }
            this.disconnect().then(() => resolve(Status.NOT_CONNECTED));
        });
    }

    async request(): Promise<{ message: string }> {
        const message = this.messages.shift();
        if (message) {
            return { message };
        }
        return new Promise<{ message: string }>((resolve) => {
            const toDispose = this.onMessageDidReadEmitter.event(() => {
                toDispose.dispose();
                resolve(this.request());
            });
        });
    }

    protected mapType(
        type?: MonitorConfig.ConnectionType
    ): GrpcMonitorConfig.TargetType {
        switch (type) {
            case MonitorConfig.ConnectionType.SERIAL:
                return GrpcMonitorConfig.TargetType.TARGET_TYPE_SERIAL;
            default:
                return GrpcMonitorConfig.TargetType.TARGET_TYPE_SERIAL;
        }
    }
}
